package org.databandtech.flinkstreaming;

import java.util.HashMap;
import java.util.Map;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.databandtech.flinkstreaming.Entity.EpgVod;

public class KafkaAreaFuntion extends ProcessWindowFunction<
												EpgVod,                  // input type
												Tuple3<String, String, Integer>,  // output type
												String,                         // key type
												TimeWindow>{

	private static final long serialVersionUID = 1445186410808342302L;

	@Override
	public void process(String key,
			ProcessWindowFunction<EpgVod, Tuple3<String, String, Integer>, String, TimeWindow>.Context context,
			Iterable<EpgVod> in, Collector<Tuple3<String, String, Integer>> out) throws Exception {
		
		String sys = key.split("-")[0];
		String area = key.split("-")[1];
		Map<String, Integer> map = new HashMap<String,Integer>();
		
		for (EpgVod item : in) {
            if (map.containsKey(key)) {
            	Integer oldValue = Integer.parseInt(map.get(key).toString());
            	map.put(key, oldValue+1);
            }else {
            	map.put(key,1);
            }
        }
		out.collect(new Tuple3<String, String, Integer>(sys,area,Integer.parseInt(map.get(key).toString())));

		
	}


}
